/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.job;

import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTED_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_QUEUE_WAIT_TIME;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.util.EnvironmentEdgeManager;

import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * Thread pool executor that executes scans in parallel
 * @since 0.1
 */
@SuppressWarnings("rawtypes")
public class JobManager<T> extends AbstractRoundRobinQueue<T> {

  private static final AtomicLong PHOENIX_POOL_INDEX = new AtomicLong(1);

  public JobManager(int maxSize) {
    super(maxSize, true); // true -> new producers move to front of queue; this reduces latency.
  }

  @Override
  protected Object extractProducer(T o) {
    if (o instanceof JobFutureTask) {
      return ((JobFutureTask) o).getJobId();
    }
    return o;
  }

  public static interface JobRunnable<T> extends Runnable {
    public Object getJobId();

    public TaskExecutionMetricsHolder getTaskExecutionMetric();
  }

  public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize,
    boolean useInstrumentedThreadPool) {
    BlockingQueue<Runnable> queue;
    if (queueSize == 0) {
      queue = new SynchronousQueue<Runnable>(); // Specialized for 0 length.
    } else {
      queue = new JobManager<Runnable>(queueSize);
    }
    String name = "phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement();
    ThreadFactory threadFactory =
      new ThreadFactoryBuilder().setNameFormat(name + "-thread-%s").setDaemon(true)
        .setThreadFactory(new ContextClassLoaderThreadFactory(JobManager.class.getClassLoader()))
        .build();
    ThreadPoolExecutor exec;
    if (useInstrumentedThreadPool) {
      // For thread pool, set core threads = max threads -- we don't ever want to exceed core
      // threads, but want to go up to core threads *before* using the queue.
      exec = new InstrumentedThreadPoolExecutor(name, size, size, keepAliveMs,
        TimeUnit.MILLISECONDS, queue, threadFactory) {
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
          return new InstrumentedJobFutureTask<T>(call);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
          return new InstrumentedJobFutureTask<T>(runnable, value);
        }
      };
    } else {
      // For thread pool, set core threads = max threads -- we don't ever want to exceed core
      // threads, but want to go up to core threads *before* using the queue.
      exec = new ThreadPoolExecutor(size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue,
        threadFactory) {
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
          // Override this so we can create a JobFutureTask so we can extract out the parentJobId
          // (otherwise, in the default FutureTask, it is private).
          return new JobFutureTask<T>(call);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
          return new JobFutureTask<T>(runnable, value);
        }
      };
    }
    exec.allowCoreThreadTimeOut(true); // ... and allow core threads to time out. This just keeps
                                       // things clean when idle, and is nice for ftests modes,
                                       // etc., where we'd especially like these not to linger.
    return exec;
  }

  /**
   * Subclasses FutureTask for the sole purpose of providing {@link #getCallable()}, which is used
   * to extract the producer in the {@link JobBasedRoundRobinQueue}
   */
  static class JobFutureTask<T> extends FutureTask<T> {
    private final Object jobId;
    @Nullable
    private final TaskExecutionMetricsHolder taskMetric;

    public JobFutureTask(Runnable r, T t) {
      super(r, t);
      if (r instanceof JobRunnable) {
        this.jobId = ((JobRunnable) r).getJobId();
        this.taskMetric = ((JobRunnable) r).getTaskExecutionMetric();
      } else {
        this.jobId = this;
        this.taskMetric = null;
      }
    }

    public JobFutureTask(Callable<T> c) {
      super(c);
      // FIXME: this fails when executor used by hbase
      if (c instanceof JobCallable) {
        this.jobId = ((JobCallable<T>) c).getJobId();
        this.taskMetric = ((JobCallable<T>) c).getTaskExecutionMetric();
      } else {
        this.jobId = this;
        this.taskMetric = null;
      }
    }

    public Object getJobId() {
      return jobId;
    }
  }

  /**
   * Instrumented version of {@link JobFutureTask} that measures time spent by a task at various
   * stages in the queue and when executed.
   */
  private static class InstrumentedJobFutureTask<T> extends JobFutureTask<T> {

    /*
     * Time at which the task was submitted to the executor.
     */
    private final long taskSubmissionTime;

    // Time at which the task is about to be executed.
    private long taskExecutionStartTime;

    public InstrumentedJobFutureTask(Runnable r, T t) {
      super(r, t);
      this.taskSubmissionTime = EnvironmentEdgeManager.currentTimeMillis();
    }

    public InstrumentedJobFutureTask(Callable<T> c) {
      super(c);
      this.taskSubmissionTime = EnvironmentEdgeManager.currentTimeMillis();
    }

    @Override
    public void run() {
      this.taskExecutionStartTime = EnvironmentEdgeManager.currentTimeMillis();
      super.run();
    }

    public long getTaskSubmissionTime() {
      return taskSubmissionTime;
    }

    public long getTaskExecutionStartTime() {
      return taskExecutionStartTime;
    }

  }

  /**
   * Delegating callable implementation that preserves the parentJobId and sets up thread tracker
   * stuff before delegating to the actual command.
   */
  public static interface JobCallable<T> extends Callable<T> {
    public Object getJobId();

    public TaskExecutionMetricsHolder getTaskExecutionMetric();
  }

  /**
   * Extension of the default thread factory returned by {@code Executors.defaultThreadFactory} that
   * sets the context classloader on newly-created threads to be a specific classloader (and not the
   * context classloader of the calling thread).
   * <p/>
   * See {@link org.apache.phoenix.util.PhoenixContextExecutor} for the rationale on changing the
   * context classloader.
   */
  static class ContextClassLoaderThreadFactory implements ThreadFactory {
    private final ThreadFactory baseFactory;
    private final ClassLoader contextClassLoader;

    public ContextClassLoaderThreadFactory(ClassLoader contextClassLoader) {
      baseFactory = Executors.defaultThreadFactory();
      this.contextClassLoader = contextClassLoader;
    }

    @Override
    public Thread newThread(Runnable r) {
      Thread t = baseFactory.newThread(r);
      t.setContextClassLoader(contextClassLoader);
      return t;
    }
  }

  /**
   * Thread pool executor that instruments the various characteristics of the backing pool of
   * threads and queue. This executor assumes that all the tasks handled are of type
   * {@link JobManager.InstrumentedJobFutureTask}
   */
  private static class InstrumentedThreadPoolExecutor extends ThreadPoolExecutor {

    private final RejectedExecutionHandler rejectedExecHandler = new RejectedExecutionHandler() {
      @Override
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        TaskExecutionMetricsHolder metrics = getRequestMetric(r);
        if (metrics != null) {
          metrics.getNumRejectedTasks().increment();
        }
        GLOBAL_REJECTED_TASK_COUNTER.increment();
        throw new RejectedExecutionException(
          "Task " + r.toString() + " rejected from " + executor.toString());
      }
    };

    public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize,
      int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
      setRejectedExecutionHandler(rejectedExecHandler);
    }

    @Override
    public void execute(Runnable task) {
      TaskExecutionMetricsHolder metrics = getRequestMetric(task);
      if (metrics != null) {
        metrics.getNumTasks().increment();
      }
      GLOBAL_TASK_EXECUTED_COUNTER.increment();
      super.execute(task);
    }

    @Override
    protected void beforeExecute(Thread worker, Runnable task) {
      InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask) task;
      long queueWaitTime =
        EnvironmentEdgeManager.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
      GLOBAL_TASK_QUEUE_WAIT_TIME.update(queueWaitTime);
      TaskExecutionMetricsHolder metrics = getRequestMetric(task);
      if (metrics != null) {
        metrics.getTaskQueueWaitTime().change(queueWaitTime);
      }
      super.beforeExecute(worker, instrumentedTask);
    }

    @Override
    protected void afterExecute(Runnable task, Throwable t) {
      InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask) task;
      try {
        super.afterExecute(instrumentedTask, t);
      } finally {
        long taskExecutionTime =
          EnvironmentEdgeManager.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime();
        long endToEndTaskTime =
          EnvironmentEdgeManager.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
        TaskExecutionMetricsHolder metrics = getRequestMetric(task);
        if (metrics != null) {
          metrics.getTaskExecutionTime().change(taskExecutionTime);
          metrics.getTaskEndToEndTime().change(endToEndTaskTime);
        }
        GLOBAL_TASK_EXECUTION_TIME.update(taskExecutionTime);
        GLOBAL_TASK_END_TO_END_TIME.update(endToEndTaskTime);
      }
    }

    private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) {
      return ((JobFutureTask) task).taskMetric;
    }
  }
}
